package drds.plus.executor.cursor.cursor.impl.join;


import com.google.common.base.Joiner;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.cursor.cursor.IIndexBlockNestedLoopCursor;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.IValueFilterCursor;
import drds.plus.executor.cursor.cursor.impl.DuplicateValueRowDataLinkedList;
import drds.plus.executor.cursor.cursor_factory.CursorFactory;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaDataImpl;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.ObjectCreateFactory;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Join;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.column.Column;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.BooleanFilter;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.Function;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.FunctionName;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.Operation;

import java.util.*;
import java.util.Map.Entry;


public class IndexBlockNestedtLoopCursor extends IndexNestedLoopMgetImpCursor implements IIndexBlockNestedLoopCursor {

    CursorFactory cursorFactory = null;

    public IndexBlockNestedtLoopCursor(ExecuteContext executeContext, CursorFactory cursorFactory, Join join, //
                                       ISortingCursor leftCursor, List leftJoinItemList, ISortingCursor rightCursor, List rightJoinItemList) throws Exception {
        super(executeContext, join, leftCursor, leftJoinItemList, rightCursor, rightJoinItemList);
        this.cursorFactory = cursorFactory;
    }


    protected Map<Record, DuplicateValueRowDataLinkedList> getRightRecordToDuplicateValueRowDataLinkedListMap(List<Record> recordList) throws RuntimeException {
        // 子查询的话不能用mget,因为子查询的话，join的列可以是函数，函数应该放在having里，而不是放在valueFilter里
        if (this.join.getRightNode().isSubQuery()) {
            return this.getRightSideRecordToDuplicateValueMapByValueFilter(recordList);
        } else {
            return rightCursor.mgetRecordToDuplicateValueRowDataLinkedListMap(recordList, false, false);
        }
    }

    /**
     * in方式获取
     */
    protected Map<Record, DuplicateValueRowDataLinkedList> getRightSideRecordToDuplicateValueMapByValueFilter(List<Record> leftRecordList) throws RuntimeException {
        rightCursor.beforeFirst();
        if (isLeftOutJoin() && this.rightCursorMetaData == null) {
            RowValues rowData = rightCursor.next();
            if (rowData != null) {
                this.rightCursorMetaData = rowData.getParentCursorMetaData();
            } else {
                rightCursorMetaData = CursorMetaDataImpl.buildCursorMetaData(rightCursor.getColumnMetaDataList());
            }
            rightCursor.beforeFirst();//beforeFirst+booleanFilter->valueFilterCursor
        }
        //
        BooleanFilter booleanFilter = ObjectCreateFactory.createBooleanFilter();
        booleanFilter.setOperation(Operation.in);
        booleanFilter.setValueList(new ArrayList<Object>());
        for (Record record : leftRecordList) {
            Map<String, Object> recordMap = record.getColumnNameToValueMap();
            if (recordMap.size() == 1) {
                // 单字段in
                Entry<String, Object> entry = recordMap.entrySet().iterator().next();
                Object value = entry.getValue();
                booleanFilter.setColumn(this.rightJoinItemList.get(0).copy());
                booleanFilter.getValueList().add(value);
            } else {
                // 多字段in
                if (booleanFilter.getColumn() == null) {
                    booleanFilter.setColumn(buildRowFunction(recordMap.keySet(), true, record));
                }
                booleanFilter.getValueList().add(buildRowFunction(recordMap.values(), false, record));
            }
        }
        IValueFilterCursor valueFilterCursor = this.cursorFactory.valueFilterCursor(executeContext, rightCursor, booleanFilter);
        //
        Column rightJoinItem = (Column) this.rightJoinItemList.get(0);
        Map<Record, DuplicateValueRowDataLinkedList> recordToDuplicateValueRowDataLinkedListMap = new HashMap<Record, DuplicateValueRowDataLinkedList>();
        if (isLeftOutJoin() && !isRightOutJoin()) {
            //left join
            macth(valueFilterCursor, leftRecordList, rightJoinItem, recordToDuplicateValueRowDataLinkedListMap);
        } else if (!isLeftOutJoin() && !isRightOutJoin()) {
            // inner join
            macth(valueFilterCursor, leftRecordList, rightJoinItem, recordToDuplicateValueRowDataLinkedListMap);
        } else {
            throw new UnsupportedOperationException("leftOutJoin:" + isLeftOutJoin() + " ; rightOutJoin:" + isRightOutJoin());
        }
        return recordToDuplicateValueRowDataLinkedListMap;
    }

    private Function buildRowFunction(Collection values, boolean isColumn, Record record) {
        Function function = ObjectCreateFactory.createFunction();
        function.setFunctionName(FunctionName.row);
        StringBuilder columnName = new StringBuilder();
        columnName.append('(').append(Joiner.on(",").join(values)).append(')');
        function.setColumnName(columnName.toString());
        if (isColumn) {
            List<Column> columnList = new ArrayList<Column>(values.size());
            for (Object value : values) {
                Column column = ObjectCreateFactory.createColumn();
                column.setColumnName((String) value);
                column.setType(record.getValueType((String) value));
                columnList.add(column);
            }
            function.setArgList(columnList);
        } else {
            function.setArgList(new ArrayList(values));
        }
        return function;
    }

    /**
     * 根据结果匹配值
     */
    private void macth(IValueFilterCursor valueFilterCursor, List<Record> leftRecordList, Column rightJoinItem, Map<Record, DuplicateValueRowDataLinkedList> recordToDuplicateValueRowDataLinkedListMap) throws RuntimeException {
        RowValues rowData = null;
        while ((rowData = valueFilterCursor.next()) != null) {
            rowData = Utils.fromRowDataToArrayRowData(rowData);
            Object object = Utils.getValue(rowData, rightJoinItem);
            for (Record leftRecord : leftRecordList) {
                Comparable comparable = (Comparable) leftRecord.getColumnNameToValueMap().values().iterator().next();
                if (object.equals(comparable)) {
                    buildRecordToDuplicateValueRowDataLinkedListMap(rowData, leftRecord, recordToDuplicateValueRowDataLinkedListMap);
                    break;//其实这里是继续的意思,双重循环通过break来进行跳出里面的循环
                }
            }
        }
    }

    private void buildRecordToDuplicateValueRowDataLinkedListMap(RowValues rowData, Record record, Map<Record, DuplicateValueRowDataLinkedList> recordToDuplicateValueRowDataLinkedListMap) {
        DuplicateValueRowDataLinkedList duplicateValueRowDataLinkedList = recordToDuplicateValueRowDataLinkedListMap.get(record);
        if (duplicateValueRowDataLinkedList == null) {
            duplicateValueRowDataLinkedList = new DuplicateValueRowDataLinkedList(rowData);
            recordToDuplicateValueRowDataLinkedListMap.put(record, duplicateValueRowDataLinkedList);
        } else {
            while (duplicateValueRowDataLinkedList.next != null) {
                duplicateValueRowDataLinkedList = duplicateValueRowDataLinkedList.next;
            }
            duplicateValueRowDataLinkedList.next = new DuplicateValueRowDataLinkedList(rowData);
        }
    }


}
